/*! \file
 * \brief Thread pool core.
 *
 * This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.
 *
 * Thread pools are a mechanism for asynchronous and parallel processing
 * within the same process. The pool class provides a convenient way
 * for dispatching asynchronous tasks as functions objects. The scheduling
 * of these tasks can be easily controlled by using customized schedulers.
 *
 * Copyright (c) 2005-2007 Philipp Henkel
 *
 * Use, modification, and distribution are  subject to the
 * Boost Software License, Version 1.0. (See accompanying  file
 * LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 *
 * http://threadpool.sourceforge.net
 *
 */

#ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED
#define THREADPOOL_POOL_CORE_HPP_INCLUDED

#include "locking_ptr.hpp"
#include "worker_thread.hpp"

#include "../task_adaptors.hpp"

#include <boost/bind/bind.hpp>
#include <boost/smart_ptr.hpp>
#include <boost/static_assert.hpp>
#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/exceptions.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/type_traits.hpp>

#include <vector>

/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost {
namespace threadpool {
namespace detail {

/*! \brief Thread pool.
 *
 * Thread pools are a mechanism for asynchronous and parallel processing
 * within the same process. The pool class provides a convenient way
 * for dispatching asynchronous tasks as functions objects. The scheduling
 * of these tasks can be easily controlled by using customized schedulers.
 * A task must not throw an exception.
 *
 * A pool_impl is DefaultConstructible and NonCopyable.
 *
 * \param Task A function object which implements the operator 'void operator()
 * (void) const'. The operator () is called by the pool to execute the task.
 * Exceptions are ignored. \param Scheduler A task container which determines
 * how tasks are scheduled. It is guaranteed that this container is accessed
 * only by one thread at a time. The scheduler shall not throw exceptions.
 *
 * \remarks The pool class is thread-safe.
 *
 * \see Tasks: task_func, prio_task_func
 * \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler
 */
template <typename Task,

          template <typename> class SchedulingPolicy,
          template <typename> class SizePolicy,
          template <typename> class SizePolicyController,
          template <typename> class ShutdownPolicy>
class pool_core : public enable_shared_from_this<
                      pool_core<Task, SchedulingPolicy, SizePolicy,
                                SizePolicyController, ShutdownPolicy>>,
                  private noncopyable {

public:                     // Type definitions
    typedef Task task_type; //!< Indicates the task's type.
    typedef SchedulingPolicy<task_type>
        scheduler_type; //!< Indicates the scheduler's type.
    typedef pool_core<Task, SchedulingPolicy, SizePolicy, SizePolicyController,
                      ShutdownPolicy>
        pool_type; //!< Indicates the thread pool's type.
    typedef SizePolicy<pool_type>
        size_policy_type; //!< Indicates the sizer's type.
    // typedef typename size_policy_type::size_controller size_controller_type;

    typedef SizePolicyController<pool_type> size_controller_type;

    //    typedef SizePolicy<pool_type>::size_controller size_controller_type;
    typedef ShutdownPolicy<pool_type>
        shutdown_policy_type; //!< Indicates the shutdown policy's type.

    typedef worker_thread<pool_type> worker_type;

    // The task is required to be a nullary function.
    BOOST_STATIC_ASSERT(function_traits<task_type()>::arity == 0);

    // The task function's result type is required to be void.
    BOOST_STATIC_ASSERT(is_void<typename result_of<task_type()>::type>::value);

private: // Friends
    friend class worker_thread<pool_type>;

#if defined(__SUNPRO_CC) && \
    (__SUNPRO_CC <=         \
     0x580) // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06
    friend class SizePolicy;
    friend class ShutdownPolicy;
#else
    friend class SizePolicy<pool_type>;
    friend class ShutdownPolicy<pool_type>;
#endif

private: // The following members may be accessed by _multiple_ threads at the
         // same time:
    volatile size_t m_worker_count;
    volatile size_t m_target_worker_count;
    volatile size_t m_active_worker_count;

private: // The following members are accessed only by _one_ thread at the same
         // time:
    scheduler_type m_scheduler;
    scoped_ptr<size_policy_type> m_size_policy; // is never null

    bool m_terminate_all_workers; // Indicates if termination of all workers was
                                  // triggered.
    std::vector<shared_ptr<worker_type>>
        m_terminated_workers; // List of workers which are terminated but not
                              // fully destructed.

private: // The following members are implemented thread-safe:
    mutable recursive_mutex m_monitor;
    mutable condition m_worker_idle_or_terminated_event; // A worker is idle or
                                                         // was terminated.
    mutable condition
        m_task_or_terminate_workers_event; // Task is available OR total worker
                                           // count should be reduced.

public:
    /// Constructor.
    pool_core()
        : m_worker_count(0), m_target_worker_count(0), m_active_worker_count(0),
          m_terminate_all_workers(false) {
        pool_type volatile& self_ref = *this;
        m_size_policy.reset(new size_policy_type(self_ref));

        m_scheduler.clear();
    }

    /// Destructor.
    ~pool_core() {}

    /*! Gets the size controller which manages the number of threads in the
     * pool. \return The size controller. \see SizePolicy
     */
    size_controller_type size_controller() {
        return size_controller_type(*m_size_policy, this->shared_from_this());
    }

    /*! Gets the number of threads in the pool.
     * \return The number of threads.
     */
    size_t size() const volatile { return m_worker_count; }

    // TODO is only called once
    void shutdown() { ShutdownPolicy<pool_type>::shutdown(*this); }

    /*! Schedules a task for asynchronous execution. The task will be executed
     * once only. \param task The task function object. It should not throw
     * execeptions. \return true, if the task could be scheduled and false
     * otherwise.
     */
    bool schedule(task_type const& task) volatile {
        locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);

        if (lockedThis->m_scheduler.push(task)) {
            lockedThis->m_task_or_terminate_workers_event.notify_one();
            return true;
        }
        else {
            return false;
        }
    }

    /*! Returns the number of tasks which are currently executed.
     * \return The number of active tasks.
     */
    size_t active() const volatile { return m_active_worker_count; }

    /*! Returns the number of tasks which are ready for execution.
     * \return The number of pending tasks.
     */
    size_t pending() const volatile {
        locking_ptr<const pool_type, recursive_mutex> lockedThis(*this,
                                                                 m_monitor);
        return lockedThis->m_scheduler.size();
    }

    /*! Removes all pending tasks from the pool's scheduler.
     */
    void clear() volatile {
        locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
        lockedThis->m_scheduler.clear();
    }

    /*! Indicates that there are no tasks pending.
     * \return true if there are no tasks ready for execution.
     * \remarks This function is more efficient that the check 'pending() == 0'.
     */
    bool empty() const volatile {
        locking_ptr<const pool_type, recursive_mutex> lockedThis(*this,
                                                                 m_monitor);
        return lockedThis->m_scheduler.empty();
    }

    /*! The current thread of execution is blocked until the sum of all active
     *  and pending tasks is equal or less than a given threshold.
     * \param task_threshold The maximum number of tasks in pool and scheduler.
     */
    void wait(size_t const task_threshold = 0) const volatile {
        const pool_type* self = const_cast<const pool_type*>(this);
        recursive_mutex::scoped_lock lock(self->m_monitor);

        if (0 == task_threshold) {
            while (0 != self->m_active_worker_count ||
                   !self->m_scheduler.empty()) {
                self->m_worker_idle_or_terminated_event.wait(lock);
            }
        }
        else {
            while (task_threshold <
                   self->m_active_worker_count + self->m_scheduler.size()) {
                self->m_worker_idle_or_terminated_event.wait(lock);
            }
        }
    }

    /*! The current thread of execution is blocked until the timestamp is met
     * or the sum of all active and pending tasks is equal or less
     * than a given threshold.
     * \param timestamp The time when function returns at the latest.
     * \param task_threshold The maximum number of tasks in pool and scheduler.
     * \return true if the task sum is equal or less than the threshold, false
     * otherwise.
     */
    bool wait(xtime const& timestamp, size_t const task_threshold = 0) const
        volatile {
        const pool_type* self = const_cast<const pool_type*>(this);
        recursive_mutex::scoped_lock lock(self->m_monitor);

        if (0 == task_threshold) {
            while (0 != self->m_active_worker_count ||
                   !self->m_scheduler.empty()) {
                if (!self->m_worker_idle_or_terminated_event.timed_wait(
                        lock, timestamp))
                    return false;
            }
        }
        else {
            while (task_threshold <
                   self->m_active_worker_count + self->m_scheduler.size()) {
                if (!self->m_worker_idle_or_terminated_event.timed_wait(
                        lock, timestamp))
                    return false;
            }
        }

        return true;
    }

private:
    void terminate_all_workers(bool const wait) volatile {
        pool_type* self = const_cast<pool_type*>(this);
        recursive_mutex::scoped_lock lock(self->m_monitor);

        self->m_terminate_all_workers = true;

        m_target_worker_count = 0;
        self->m_task_or_terminate_workers_event.notify_all();

        if (wait) {
            while (m_active_worker_count > 0) {
                self->m_worker_idle_or_terminated_event.wait(lock);
            }

            for (typename std::vector<shared_ptr<worker_type>>::iterator it =
                     self->m_terminated_workers.begin();
                 it != self->m_terminated_workers.end(); ++it) {
                (*it)->join();
            }
            self->m_terminated_workers.clear();
        }
    }

    /*! Changes the number of worker threads in the pool. The resizing
     *  is handled by the SizePolicy.
     * \param threads The new number of worker threads.
     * \return true, if pool will be resized and false if not.
     */
    bool resize(size_t const worker_count) volatile {
        locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);

        if (!m_terminate_all_workers) {
            m_target_worker_count = worker_count;
        }
        else {
            return false;
        }

        if (m_worker_count <= m_target_worker_count) { // increase worker count
            while (m_worker_count < m_target_worker_count) {
                try {
                    worker_thread<pool_type>::create_and_attach(
                        lockedThis->shared_from_this());
                    m_worker_count++;
                    m_active_worker_count++;
                } catch (thread_resource_error) {
                    return false;
                }
            }
        }
        else { // decrease worker count
            lockedThis->m_task_or_terminate_workers_event
                .notify_all(); // TODO: Optimize number of notified workers
        }

        return true;
    }

    // worker died with unhandled exception
    void worker_died_unexpectedly(shared_ptr<worker_type> worker) volatile {
        locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);

        m_worker_count--;
        m_active_worker_count--;
        lockedThis->m_worker_idle_or_terminated_event.notify_all();

        if (m_terminate_all_workers) {
            lockedThis->m_terminated_workers.push_back(worker);
        }
        else {
            lockedThis->m_size_policy->worker_died_unexpectedly(m_worker_count);
        }
    }

    void worker_destructed(shared_ptr<worker_type> worker) volatile {
        locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
        m_worker_count--;
        m_active_worker_count--;
        lockedThis->m_worker_idle_or_terminated_event.notify_all();

        if (m_terminate_all_workers) {
            lockedThis->m_terminated_workers.push_back(worker);
        }
    }

    bool execute_task() volatile {
        function0<void> task;

        { // fetch task
            pool_type* lockedThis = const_cast<pool_type*>(this);
            recursive_mutex::scoped_lock lock(lockedThis->m_monitor);

            // decrease number of threads if necessary
            if (m_worker_count > m_target_worker_count) {
                return false; // terminate worker
            }

            // wait for tasks
            while (lockedThis->m_scheduler.empty()) {
                // decrease number of workers if necessary
                if (m_worker_count > m_target_worker_count) {
                    return false; // terminate worker
                }
                else {
                    m_active_worker_count--;
                    lockedThis->m_worker_idle_or_terminated_event.notify_all();
                    lockedThis->m_task_or_terminate_workers_event.wait(lock);
                    m_active_worker_count++;
                }
            }

            task = lockedThis->m_scheduler.top();
            lockedThis->m_scheduler.pop();
        }

        // call task function
        if (task) {
            task();
        }

        // guard->disable();
        return true;
    }
};

} // namespace detail
} // namespace threadpool
} // namespace boost

#endif // THREADPOOL_POOL_CORE_HPP_INCLUDED
